1 package org.apache.lucene.replicator;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.IOException;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Callable;
25
26 import org.apache.lucene.index.DirectoryReader;
27 import org.apache.lucene.index.IndexCommit;
28 import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
29 import org.apache.lucene.store.Directory;
30 import org.apache.lucene.store.IOContext;
31 import org.apache.lucene.util.InfoStream;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
50
51
52
53
54
55 public static final String INFO_STREAM_COMPONENT = "IndexAndTaxonomyReplicationHandler";
56
57 private final Directory indexDir;
58 private final Directory taxoDir;
59 private final Callable<Boolean> callback;
60
61 private volatile Map<String,List<RevisionFile>> currentRevisionFiles;
62 private volatile String currentVersion;
63 private volatile InfoStream infoStream = InfoStream.getDefault();
64
65
66
67
68
69 public IndexAndTaxonomyReplicationHandler(Directory indexDir, Directory taxoDir, Callable<Boolean> callback)
70 throws IOException {
71 this.callback = callback;
72 this.indexDir = indexDir;
73 this.taxoDir = taxoDir;
74 currentRevisionFiles = null;
75 currentVersion = null;
76 final boolean indexExists = DirectoryReader.indexExists(indexDir);
77 final boolean taxoExists = DirectoryReader.indexExists(taxoDir);
78 if (indexExists != taxoExists) {
79 throw new IllegalStateException("search and taxonomy indexes must either both exist or not: index=" + indexExists
80 + " taxo=" + taxoExists);
81 }
82 if (indexExists) {
83 final IndexCommit indexCommit = IndexReplicationHandler.getLastCommit(indexDir);
84 final IndexCommit taxoCommit = IndexReplicationHandler.getLastCommit(taxoDir);
85 currentRevisionFiles = IndexAndTaxonomyRevision.revisionFiles(indexCommit, taxoCommit);
86 currentVersion = IndexAndTaxonomyRevision.revisionVersion(indexCommit, taxoCommit);
87 final InfoStream infoStream = InfoStream.getDefault();
88 if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
89 infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
90 + " currentRevisionFiles=" + currentRevisionFiles);
91 infoStream.message(INFO_STREAM_COMPONENT, "constructor(): indexCommit=" + indexCommit
92 + " taxoCommit=" + taxoCommit);
93 }
94 }
95 }
96
97 @Override
98 public String currentVersion() {
99 return currentVersion;
100 }
101
102 @Override
103 public Map<String,List<RevisionFile>> currentRevisionFiles() {
104 return currentRevisionFiles;
105 }
106
107 @Override
108 public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles,
109 Map<String,List<String>> copiedFiles, Map<String,Directory> sourceDirectory) throws IOException {
110 Directory taxoClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
111 Directory indexClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
112 List<String> taxoFiles = copiedFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
113 List<String> indexFiles = copiedFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
114 String taxoSegmentsFile = IndexReplicationHandler.getSegmentsFile(taxoFiles, true);
115 String indexSegmentsFile = IndexReplicationHandler.getSegmentsFile(indexFiles, false);
116 String taxoPendingFile = taxoSegmentsFile == null ? null : "pending_" + taxoSegmentsFile;
117 String indexPendingFile = "pending_" + indexSegmentsFile;
118
119 boolean success = false;
120 try {
121
122 IndexReplicationHandler.copyFiles(taxoClientDir, taxoDir, taxoFiles);
123 IndexReplicationHandler.copyFiles(indexClientDir, indexDir, indexFiles);
124
125
126 if (!taxoFiles.isEmpty()) {
127 taxoDir.sync(taxoFiles);
128 }
129 indexDir.sync(indexFiles);
130
131
132
133
134 if (taxoSegmentsFile != null) {
135 taxoDir.copyFrom(taxoClientDir, taxoSegmentsFile, taxoPendingFile, IOContext.READONCE);
136 }
137 indexDir.copyFrom(indexClientDir, indexSegmentsFile, indexPendingFile, IOContext.READONCE);
138
139 if (taxoSegmentsFile != null) {
140 taxoDir.sync(Collections.singletonList(taxoPendingFile));
141 }
142 indexDir.sync(Collections.singletonList(indexPendingFile));
143
144 if (taxoSegmentsFile != null) {
145 taxoDir.renameFile(taxoPendingFile, taxoSegmentsFile);
146 }
147
148 indexDir.renameFile(indexPendingFile, indexSegmentsFile);
149
150 success = true;
151 } finally {
152 if (!success) {
153 if (taxoSegmentsFile != null) {
154 taxoFiles.add(taxoSegmentsFile);
155 taxoFiles.add(taxoPendingFile);
156 }
157 IndexReplicationHandler.cleanupFilesOnFailure(taxoDir, taxoFiles);
158 indexFiles.add(indexSegmentsFile);
159 indexFiles.add(indexPendingFile);
160 IndexReplicationHandler.cleanupFilesOnFailure(indexDir, indexFiles);
161 }
162 }
163
164
165 currentRevisionFiles = revisionFiles;
166 currentVersion = version;
167
168 if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
169 infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
170 + " currentRevisionFiles=" + currentRevisionFiles);
171 }
172
173
174
175
176
177
178 IndexReplicationHandler.cleanupOldIndexFiles(indexDir, indexSegmentsFile, infoStream);
179 IndexReplicationHandler.cleanupOldIndexFiles(taxoDir, taxoSegmentsFile, infoStream);
180
181
182
183 if (callback != null) {
184 try {
185 callback.call();
186 } catch (Exception e) {
187 throw new IOException(e);
188 }
189 }
190 }
191
192
193 public void setInfoStream(InfoStream infoStream) {
194 if (infoStream == null) {
195 infoStream = InfoStream.NO_OUTPUT;
196 }
197 this.infoStream = infoStream;
198 }
199
200 }